First we parse the connection details from the vcap.json file
In [5]:
import scala.io.Source
import play.api.libs.json._
val json = Json.parse(Source.fromFile("vcap.json").getLines().mkString)
val bootstrap_servers = (json \ "kafka_brokers_sasl").
toString().
replaceAll("\"", "").
replaceAll("\\[", "").
replaceAll("\\]", "")
val username = (json \ "user").toString().replace("\"", "")
val password = (json \ "password").toString().replace("\"", "")
val topic = (json \ "topic").toString().replace("\"", "")
If the Message Hub consumer does not work, ensure your connection details are correct. Uncomment the code below to verify.
In [15]:
println("Connection details:")
println(bootstrap_servers)
{
/*
println(username)
println(password)
println(topic)
*/
}
Next we connect to kafka. When you run the next cell, it will run indefinitely until you stop or interrupt the kernel.
After running the cell, open the Step 4 notebook in another tab to send some data to Message Hub and come back to the output under the cell below to see that the data is displayed by this consumer.
Note that it could take 60 seconds before the sent data is printed out by this consumer.
In [ ]:
import net.christophersnow.config.MessageHubConfig
import net.christophersnow.dstream.KafkaStreaming.KafkaStreamingContextAdapter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaProps = new MessageHubConfig
kafkaProps.setConfig("bootstrap.servers", bootstrap_servers)
kafkaProps.setConfig("kafka.user.name", username)
kafkaProps.setConfig("kafka.user.password", password)
kafkaProps.setConfig("kafka.topic", topic)
kafkaProps.createConfiguration()
val ssc = new StreamingContext( sc, Seconds(60) )
val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
kafkaProps,
List(kafkaProps.getConfig("kafka.topic"))
);
stream.foreachRDD{ rdd =>
// we only want to create a folder in hdfs if we have some data
if (rdd.count() > 0) {
def uuid = java.util.UUID.randomUUID.toString
val outDir = s"test-${uuid}"
rdd.saveAsTextFile (outDir)
}
}
stream.print()
ssc.start()
ssc.awaitTermination()
In [ ]: